 1.车辆监控之Structured Streaming中Source
   
   Socket source (for testing): 从socket连接中读取文本内容。
   Kafka source: 从Kafka中拉取数据,与0.10 或以上的版本兼容，后面单独整合Kafka
       Socket
	   yum install -y nc
       nc -lk 9999
   
   整合Kafka单独讲解。
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <spark.version>2.4.5</spark.version>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.10</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>
            <!-- 指定编译java的插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
            </plugin>
            <!-- 指定编译scala的插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

    </build>   
    
package com.lg.test

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
 * 使用结构化流实现从socket读取数据实现单词统计
 */
object WordCount {
  def main(args: Array[String]): Unit = {
    //1.获取sparksession
    val spark: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName(WordCount.getClass.getName)
      .getOrCreate()
    val sc = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.接收socket数据
    val df: DataFrame = spark.readStream
      .option("host", "linux123")
      .option("port", "9999")
      .format("socket")
      .load()
    //3.处理数据，接收一行数据，按照空格进行切分
    //转为ds
    import spark.implicits._
    val ds: Dataset[String] = df.as[String]
    val wordDs = ds.flatMap(_.split(""))
    //4使用ds1风格语句执行聚合统计
    val res: Dataset[Row] = wordDs.groupBy("value").count().sort($"count".desc)
    //输出
    res.writeStream
      .format("console") //输出控制台
      .outputMode("complete") //指定输出模式，全部数据的计算结果
      .trigger(Trigger.ProcessingTime(0)) //尽可能快的触发计算
      .start() //启动
      .awaitTermination()
  }
}


 2.计算
   
   获得到Source之后的基本数据处理方式和之前学习的DataFrame、DataSet一致，不再赘述
   官网示例代码
case class DeviceData(device: String, deviceType: String, signal: Double, time:
DateTime)
val df: DataFrame = ...
val ds: Dataset[DeviceData] = df.as[DeviceData]  
// Select the devices which have signal more than 10
df.select("device").where("signal > 10")    // using untyped APIs 
ds.filter(_.signal > 10).map(_.device)     // using typed APIs
// Running count of the number of updates for each device type
df.groupBy("deviceType").count()         // using untyped API
// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))   // using typed API

 3.输出
   
   计算结果可以选择输出到多种设备并进行如下设定
   1、 output mode：以哪种方式将result table的数据写入sink
   2、 format/output sink的一些细节：数据格式、位置等。
   3、 query name：指定查询的标识。类似tempview的名字
   4、 trigger interval：触发间隔，如果不指定，默认会尽可能快速地处理数据
   5、 checkpoint地址：一般是hdfs上的目录。注意：Socket不支持数据恢复，如果设置了，第二次启动会
报错 ,Kafka支持
   
   1).output mode
   每当结果表更新时，我们都希望将更改后的结果行写入外部接收器。
   这里有三种输出模型:
   (1).Append mode:默认模式，新增的行才输出，每次更新结果集时，只将新添加到结果集的结果行输出
到接收器。仅支持那些添加到结果表中的行永远不会更改的查询。因此，此模式保证每行仅输出一次。
例如，仅查询select，where，map，flatMap，filter，join等会支持追加模式。不支持聚合
   (2).Complete mode: 所有内容都输出，每次触发后，整个结果表将输出到接收器。聚合查询支持此功
能。仅适用于包含聚合操作的查询。
   (3).Update mode:更新的行才输出，每次更新结果集时，仅将被更新的结果行输出到接收器(自Spark
2.1.1 起可用)，不支持排序
   2).output sink
   File sink - Stores the output to a directory.支持parquet文件,以及append模式
writeStream
 .format("parquet")   **// can be "orc", "json", "csv", etc.**
 .option("path", "path/to/destination/dir")
 .start()

   Kafka sink - Stores the output to one or more topics in Kafka.
writeStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .option("topic", "updates")
 .start()
 
   Foreach sink - Runs arbitrary computation on the records in the output. See later in the section
for more details.
writeStream
 .foreach(...)
 .start()   
   
   Console sink (for debugging) - Prints the output to the console/stdout every time there is a
trigger. Both, Append and Complete output modes, are supported. This should be used for
debugging purposes on low data volumes as the entire output is collected and stored in the
driver’s memory after every trigger.
writeStream
 .format("console")
 .start()
 
   Memory sink (for debugging) - The output is stored in memory as an in-memory table. Both,
Append and Complete output modes, are supported. This should be used for debugging
purposes on low data volumes as the entire output is collected and stored in the driver’s
memory. Hence, use it with caution.
writeStream
 .format("memory")
 .queryName("tableName")
 .start()
 
   官网示例代码
// ========== DF with no aggregations ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10") 
// Print new data to console
noAggDF
 .writeStream
 .format("console")
 .start()
// Write new data to Parquet files
noAggDF
 .writeStream
 .format("parquet")
 .option("checkpointLocation", "path/to/checkpoint/dir")
 .option("path", "path/to/destination/dir")
 .start()
// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()
// Print updated aggregations to console
aggDF
 .writeStream
 .outputMode("complete")
 .format("console")
 .start()
// Have all the aggregates in an in-memory table
aggDF
 .writeStream
 .queryName("aggregates")   // this query name will be the table name
 .outputMode("complete")
 .format("memory")
 .start()
spark.sql("select * from aggregates").show()  // interactively query in-memory table